草庐IT

flink 流批

全部标签

Flink自定义source(单并行度和多并行度)

文章目录1.Source简介2.Flink预定义的Source3.自定义单并行度Source4.自定义多并行度SourceDataStream是Flink的较低级API,用于进行数据的实时处理任务,可以将该编程模型分为Source、Transformation、Sink三个部分,如下图所示。本文来介绍常用的并行度Source和多并行度Source。1.Source简介source是程序的数据源输入,你可以通过StreamExecutionEnvironment.addSource(sourceFunction)来为你的程序添加一个source。flink提供了大量的已经实现好的source方法

Iceberg从入门到精通系列之九:flink sql修改Iceberg表和删除Iceberg表

Iceberg从入门到精通系列之九:flinksql修改Iceberg表一、修改表属性二、修改表名三、删除表一、修改表属性ALTERTABLE`hive_catalog`.`default`.`sample`SET('write.format.default'='avro');二、修改表名ALTERTABLE`hive_catalog`.`default`.`sample`RENAMETO`hive_catalog`.`default`.`new_sample`;三、删除表DROPTABLE`hive_catalog`.`default`.`sample`;

flink程序在消费kafka数据时出现Error sending fetch request问题

1.问题背景在程序已经稳定运行多天、未对代码做任何修改、查看所消费数据源未出现数据增多的情况下,有一个flink程序最近出现了积压问题,很是疑惑,观察几天并查看了日志发现,每当出现加压时便会伴随该日志出现,因此便着手解决该问题。2.解决问题在网上搜索一番后,同时看了kafka配置方面的内容,就修改了如下两个配置session.timeout.ms=30000增加至60000;request.timeout.ms=20000增加至40000;当时确实起作用了,不再出现积压,也不会再出现这样的日志,可是过了一段时间后又出现了积压并伴随该日志出现,于是又分别将上述量配置增加至80000和40000,

flink程序在消费kafka数据时出现Error sending fetch request问题

1.问题背景在程序已经稳定运行多天、未对代码做任何修改、查看所消费数据源未出现数据增多的情况下,有一个flink程序最近出现了积压问题,很是疑惑,观察几天并查看了日志发现,每当出现加压时便会伴随该日志出现,因此便着手解决该问题。2.解决问题在网上搜索一番后,同时看了kafka配置方面的内容,就修改了如下两个配置session.timeout.ms=30000增加至60000;request.timeout.ms=20000增加至40000;当时确实起作用了,不再出现积压,也不会再出现这样的日志,可是过了一段时间后又出现了积压并伴随该日志出现,于是又分别将上述量配置增加至80000和40000,

Flink 状态一致性

Flink状态一致性端到端精确一次输入端输出端预写日志两阶段提交状态一致性有三种级别:最多一次(AT-MOST-ONCE):只处理一次,遇到故障就会丢失,优点:处理快至少一次(AT-LEAST-ONCE):不会丢失数据,但存在重复数据精确一次(EXACTLY-ONCE):不会丢失数据,也不会重复数据实现要求:端到端(end-to-end)的状态一致性:数据源、流处理器、外部存储系统都要有保证机制at-least-once级别:数据源能重放数据端到端精确一次端到端精确一次(end-to-endexactly-once)的关键点:输入端:数据能重放数据(如:Kafka)Flink靠检查点机制,能实

flink时间窗口无新的数据进来最后一个窗口不关闭

测试反馈,配置的flink任务提交上去后,输入数据源符合条件,到时间窗口的size。最后一个窗口没有闭窗计算,数据并没及时输出告警经过调试发现,watermark没有向后继续推进,导致无法闭窗,watermark的时间取的是数据中的业务时间,create_time。因为没有后续数据进来,所以watermark一直停在收到的最后一条数据的时间,,按照官网的watermark的实现:inputStream.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(newEventWaterMark(window)).withTimes

Flink读写Doris操作介绍

Flink读写Doris操作介绍​FlinkDorisConnector可以支持通过Flink操作(读取、插入、修改、删除)Doris中存储的数据。可以将Doris表映射为DataStream或者Table。Flink操作Doris修改和删除只支持在UniqueKey模型上1.准备开发环境pom.xml加入依赖dependency>groupId>org.apache.dorisgroupId>artifactId>flink-doris-connector-1.13_2.12artifactId>version>1.0.3version>dependency>创建测试库测试表--切测试库us

Flink 学习十 FlinkSQL

Flink学习十FlinkSQL1.FlinkSQL基础概念flinksql基于flinkcore,使用sql语义方便快捷的进行结构化数据处理的上层库;类似理解sparksql和sparkcore,hive和mapreduce1.1工作流程整体架构和工作流程数据流,绑定元数据schema,注册成catalog中的表table/view用户使用tableApi/tablesql来表达计算逻辑table-planner利用apachecalcite进行sql语法解析,绑定元数据得到逻辑执行计划再由Optimizer进行优化,得到物理执行计划物理计划经过代码生成器生成代码.得到transformat

flink-sql读写hive-1.13

1.版本说明本文档内容基于flink-1.13.x,其他版本的整理,请查看本人博客的flink专栏其他文章。1.1.概述ApacheHive已经成为了数据仓库生态系统中的核心。它不仅仅是一个用于大数据分析和ETL场景的SQL引擎,同样也是一个数据管理平台,可用于发现,定义,和演化数据。Flink与Hive的集成包含两个层面。一是利用了Hive的MetaStore作为持久化的Catalog,用户可通过HiveCatalog将不同会话中的Flink元数据存储到HiveMetastore中。例如,用户可以使用HiveCatalog将Kafka表或Elasticsearch表存储在HiveMetast

Flink CDC实现一个Job同步多个表

直接使用FlinkCDCSQL的写法,一个Job只能同步一个表的数据,至于原因,在此不再赘述。直接上代码吧第一步,自定义DebeziumDeserializationSchema将SourceRecord类转化为自定义的JsonRecord类型publicclassJsonStringDebeziumDeserializationSchemaimplementsDebeziumDeserializationSchema{@Overridepublicvoiddeserialize(SourceRecordrecord,Collectorout)throwsException{Envelope.